package com.huan.study.flink.lambda;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

/**
 * 测试 lambda 表达式在 flink 中的应用
 * 不建议使用 lambda 表达式
 * <p>
 * 参考文章：https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html
 *
 * @author huan.fu
 * @date 2019-11-17 - 13:57
 */
public class FlinkLambdaJob {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        List<String> datas = Arrays.asList("aa,bb,ccc,dd,aa", "cc,dd,ee,ff,gg,aa");
        DataSource<String> dataSource = env.fromCollection(datas);
        dataSource.flatMap((String input, Collector<String[]> collector) -> {
                        collector.collect(input.split(","));
                    })
                    .returns(Types.OBJECT_ARRAY(Types.STRING))
                    .flatMap((String[] words, Collector<Tuple2<String, Integer>> collector) -> {
                        Arrays.stream(words).map(word -> new Tuple2<>(word, 1)).forEach(collector::collect);
                    })
                    .returns(Types.TUPLE(Types.STRING,Types.INT))
                    .groupBy(0)
                    .sum(1)
                    .print();
    }
}
